package com.skyline.courier.net.provider.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;

import java.net.SocketAddress;
import java.util.Map;

import com.skyline.courier.net.AbstractConnection;
import com.skyline.courier.net.Connection;
import com.skyline.courier.net.SendFuture;
import com.skyline.courier.net.TransferBean;

public class NettyConnection extends AbstractConnection implements Connection {
	public static final AttributeKey<Map<Long, SendFuture<?>>> FUTURES_ATTRIBUTE_KEY = new AttributeKey<Map<Long, SendFuture<?>>>(
			"futures");
	private Channel channel;
	private Bootstrap bootstrap;

	public NettyConnection(Bootstrap bootstrap) {
		this.bootstrap = bootstrap;
	}
	
	protected synchronized void doConnect(final SocketAddress address) {
		ChannelFuture channelFuture = bootstrap.connect(address);
		channelFuture.addListener(new ChannelFutureListener() {
			public void operationComplete(ChannelFuture future) throws Exception {
				if (!future.isSuccess() && connectionManager != null) {
					// 将该地址加入重连列表
					connectionManager.removeConnected(address);
					connectionManager.addDisconnectAddress(address, NettyConnection.this);
				}
			}
		});
		channelFuture.syncUninterruptibly();
		channel = channelFuture.channel();
	}

	@Override
	public boolean isOpen() {
		return channel.isActive();
	}

	@Override
	protected <O> void doSend(TransferBean input, final SendFuture<O> invokeFuture) {
		if (!isOpen()) { // 判断重连
			reconnect();
		}
		
		// 将futures设置attribute
		Attribute<Map<Long, SendFuture<?>>> attribute = channel.attr(FUTURES_ATTRIBUTE_KEY);
		attribute.set(futures);
		
		// 发送input对象
		ChannelFuture channelFuture = channel.writeAndFlush(input);
		channelFuture.addListener(new ChannelFutureListener() {
			public void operationComplete(ChannelFuture future) throws Exception {
				if (!future.isSuccess()) {
					invokeFuture.setCause(future.cause());
				}
			}
		});
	}

	@Override
	protected void doReconnect() {
		doConnect(address);
	}

	@Override
	protected void doClose() {
		channel.close().syncUninterruptibly();
	}

}
